MySQLからRedshiftへLoadするタスクを作ってみた。その2(S3 → Redshift) | Luigi Advent Calendar 2016 #24
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』24日目の内容となります。
前日に引き続きMySQLからRedhisftに取り込むタスクを実装していきます。
先日23日目はMySQLからRedshiftへLoadするタスクを作ってみた。その1(MySQL → S3) でした。
前回まででMySQLから抽出した結果をS3に配置すると言ったタスクを実装しました。 今回は前回S3に配置したファイルをRedshiftに取り込むタスクを実装していきます。
S3にからRedshiftへ取り込む
実装結果としては以下になります。
# -*- coding: utf-8 -*- from logging import getLogger, StreamHandler, DEBUG import luigi import luigi.s3 import luigi.contrib.mysqldb import contrib.redshift_ext import os import csv logger = getLogger(__name__) handler = StreamHandler() handler.setLevel(DEBUG) logger.setLevel(DEBUG) logger.addHandler(handler) try: import mysql from mysql.connector import errorcode except ImportError as e: logger.warning( "Loading MySQL module without the python package mysql-connector-python. \ This will crash at runtime if MySQL functionality is used.") class SQLFile(luigi.ExternalTask): (省略) class extractMySQLTableToS3(luigi.Task): (省略) class LoadS3toRedshift(contrib.redshift_ext.S3CopyWithIAMRole): sql_file = luigi.Parameter(default="employee.sql") redshift_host = luigi.Parameter() redshift_database = luigi.Parameter() redshift_password = luigi.Parameter() redshift_user = luigi.Parameter() redshift_copy_options = "region AS 'ap-northeast-1' DELIMITER AS '\t' TIMEFORMAT AS 'auto' IGNOREHEADER 1 GZIP" aws_account = luigi.Parameter() role_name = luigi.Parameter() def requires(self): return extractMySQLTableToS3() @property def host(self): return self.redshift_host @property def database(self): return self.redshift_database @property def user(self): return self.redshift_user @property def password(self): return self.redshift_password @property def table(self): return self.sql_file.replace(".sql", "") def s3_load_path(self): return self.input().path @property def iam_role_arn(self): iam_role_arn = "arn:aws:iam::{}:role/{}".format(self.aws_account, self.role_name) return iam_role_arn @property def copy_options(self): return self.redshift_copy_options @property def columns(self): with self.input().open('r') as file_in: for line in file_in: columns = [x for x in line.split('\t')] return columns if __name__ == '__main__': luigi.run()
実装した特徴としては以下になります。
- 以前実装したIAM Role対応のモジュールを用いて取り込みを行いました。そのためimportでcontrib.redshift_extを取り込んでいます。
- テーブル名はMySQLからS3に抽出する際に取り込んだファイル名をテーブル名としています。
- カラムの順番を調整する必要があったので、S3からファイルを取得し、取得したヘッダーをLoadする際のカラムとして指定をしています。
まとめ
ひとまずMySQLからRedshiftに取り込むようなタスクを実装できました。
明日は、今までのアドベントカレンダーを振り返ってみたいと思います。